Tidy up console destruction on domain exit.
"""
import string
+from twisted.internet import reactor
+
# subscribe a.b.c h: map a.b.c -> h
# subscribe a.b.* h: map a.b.* -> h
# subscribe a.b.? h: map a.b.? -> h
"""
if event == None:
self.handlers.clear()
- else:
+ elif event in self.handlers:
del self.handlers[event]
def unsubscribe(self, event, handler):
if handler in hl:
hl.remove(handler)
- def inject(self, event, val):
- """Inject an event. Handlers for it are called if runing, otherwise
+ def inject(self, event, val, async=1):
+ """Inject an event. Handlers for it are called if running, otherwise
it is queued.
event event type
val event value
"""
if self.run:
- #print ">event", event, val
- self.call_event_handlers(event, event, val)
- self.call_query_handlers(event, val)
- self.call_star_handlers(event, val)
+ if async:
+ reactor.callLater(0, self.call_handlers, event, val)
+ else:
+ self.notify_handlers(event, val)
else:
self.queue.append( (event, val) )
+ def call_handlers(self, event, val):
+ """Internal method to call event handlers.
+ """
+ #print ">event", event, val
+ self.call_event_handlers(event, event, val)
+ self.call_query_handlers(event, val)
+ self.call_star_handlers(event, val)
+
def call_event_handlers(self, key, event, val):
"""Call the handlers for an event.
It is safe for handlers to subscribe or unsubscribe.
def __init__(self, console, dom1, port1, dom2, port2, conn=None):
self.console = console
- self.dom1 = dom1
- self.port1 = port1
- self.dom2 = dom2
- self.port2 = port2
+ self.dom1 = int(dom1)
+ self.port1 = int(port1)
+ self.dom2 = int(dom2)
+ self.port2 = int(port2)
self.conn = conn
#self.id = "%d.%d-%d.%d" % (self.dom1, self.port1, self.dom2, self.port2)
self.id = str(port1)
print 'XendConsole> rebooted: removing all console info'
self.rm_all()
eserver.subscribe('xend.domain.died', self.onDomainDied)
+ eserver.subscribe('xend.domain.destroy', self.onDomainDied)
def rm_all(self):
"""Remove all console info. Used after reboot.
self._delete_console(c.id)
def onDomainDied(self, event, val):
- print 'onDomainDied', "dom=", dom,
dom = int(val)
+ #print 'XendConsole>onDomainDied', 'event', event, "dom=", dom
for c in self.consoles():
- print 'onDomainDied', "dom=", dom, "dom1=", c.dom1, "dom2=", c.dom2
+ #print 'onDomainDied', "dom=", dom, "dom1=", c.dom1, "dom2=", c.dom2
if (c.dom1 == dom) or (c.dom2 == dom):
+ 'XendConsole>onDomainDied', 'delete console dom=', dom
+ ctrl = xcd.get_domain_console(dom)
+ if ctrl:
+ ctrl.close()
self._delete_console(c.id)
def sync(self):
self._delete_domain(domid)
deferred = defer.DeferredList(dlist, fireOnOneErrback=1)
def cbok(val):
- print "doms:"
- for d in self.domain.values(): print 'dom', d
- print "refresh..."
+ #print "doms:"
+ #for d in self.domain.values(): print 'dom', d
self.refresh()
- print "doms:"
+ print "XendDomain>initial_refresh> doms:"
for d in self.domain.values(): print 'dom', d
deferred.addCallback(cbok)
self.db.delete(id)
def reap(self):
- print 'reap>'
+ """Go through the domains looking for ones that have crashed or stopped.
+ Tidy them up.
+ """
+ print 'XendDomain>reap>'
domlist = xc.domain_getinfo()
casualties = []
for d in domlist:
- print 'dom', d
+ #print 'dom', d
dead = 0
dead = dead or (d['crashed'] or d['shutdown'])
dead = dead or (d['dying'] and
casualties.append(d)
for d in casualties:
id = str(d['dom'])
- print 'died> id=', id, d
+ print 'XendDomain>reap> died id=', id, d
dominfo = self.domain.get(id)
if not dominfo: continue
dominfo.died()
self.domain_destroy(id, refresh=0)
- print 'reap<'
+ print 'XendDomain>reap<'
def refresh(self):
"""Refresh domain list from Xen.
try:
self._delete_domain(id)
except:
+ print 'refresh_domain: error'
+ raise
pass
else:
d = self.domain.get(id)
try:
info = self.xconsole.console_get(x)
val = SrvConsole(info)
- except KeyError:
+ except KeyError, ex:
+ print 'SrvConsoleDir>', ex
pass
return val
return ['ok']
def op_info(self, name, req):
- val = self.daemon.consoles()
+ val = ['info']
+ val += self.daemon.consoles()
+ val += self.daemon.blkifs()
+ val += self.daemon.netifs()
return val
def op_sys_subscribe(self, name, v):
d = self.blkifCF.createInstance(dom, recreate=recreate)
return d
+ def blkifs(self):
+ return [ x.sxpr() for x in self.blkifCF.getInstances() ]
+
def blkif_get(self, dom):
return self.blkifCF.getInstanceByDom(dom)
"""
return self.netifCF.createInstance(dom, recreate=recreate)
+ def netifs(self):
+ return [ x.sxpr() for x in self.netifCF.getInstances() ]
+
def netif_get(self, dom):
return self.netifCF.getInstanceByDom(dom)
console = self.get_console(id)
if not console:
raise ValueError('Invalid console id')
- if console.conn:
- console.conn.loseConnection()
+ console.disconnect()
def domain_shutdown(self, dom, reason):
"""Shutdown a domain.
try:
dom = self.xd.domain_get(x)
val = SrvDomain(dom)
- except KeyError:
+ except KeyError, ex:
+ print 'SrvDomainDir>', ex
pass
return val
config = pin.get_val()
ok = 1
except Exception, ex:
- print ex
+ print 'op_create>', ex
if not ok:
req.setResponseCode(http.BAD_REQUEST, "Invalid configuration")
return "Invalid configuration"
"""Get the channel for the given domain.
Construct if necessary.
"""
+ dom = int(dom)
for chan in self.channels.values():
if not isinstance(chan, Channel): continue
if chan.dom == dom:
"""
BaseChannel.__init__(self, factory)
# Domain.
- self.dom = dom
+ self.dom = int(dom)
# Domain port (object).
self.port = self.factory.createPort(dom)
# Channel port (int).
self.devs_by_type = {}
# Output queue.
self.queue = []
+ self.closed = 0
def getLocalPort(self):
"""Get the local port.
"""Close the channel. Calls lostChannel() on all its devices and
channelClosed() on the factory.
"""
+ self.closed = 1
for d in self.devs:
d.lostChannel()
self.factory.channelClosed(self)
- del self.devs
- del self.devs_by_type
+ self.devs = []
+ self.devs_by_type = {}
def registerDevice(self, types, dev):
"""Register a device controller.
@param types message types the controller handles
@param dev device controller
"""
+ if self.closed: return
self.devs.append(dev)
for ty in types:
self.devs_by_type[ty] = dev
- def unregisterDevice(self, dev):
+ def deregisterDevice(self, dev):
"""Remove the registration for a device controller.
@param dev device controller
"""
- self.devs.remove(dev)
- types = [ ty for (ty, d) in self.devs_by_type.items()
- if d == dev ]
+ if dev in self.devs:
+ self.devs.remove(dev)
+ types = [ ty for (ty, d) in self.devs_by_type.items() if d == dev ]
for ty in types:
- del devs_by_type[ty]
+ del self.devs_by_type[ty]
def getDevice(self, type):
"""Get the device controller handling a message type.
"""
def __init__(self, factory, dom, console_port):
- #print 'ConsoleController> dom=', dom
+ #print 'ConsoleController> dom=', dom, type(dom)
controller.Controller.__init__(self, factory, dom)
self.majorTypes = [ CMSG_CONSOLE ]
self.status = "new"
def sxpr(self):
val =['console',
+ ['status', self.status ],
['id', self.idx ],
['domain', self.dom ],
['local_port', self.channel.getLocalPort() ],
return self.status == 'connected'
def close(self):
- self.status = "closed"
- self.listener.stopListening()
- self.deregisterChannel()
- self.lostChannel()
+ try:
+ #print 'ConsoleController> close dom=', self.dom
+ self.status = "closed"
+ if self.conn:
+ self.conn.loseConnection()
+ self.listener.stopListening()
+ self.deregisterChannel()
+ self.lostChannel()
+ except Exception, ex:
+ print 'ConsoleController>close>', ex
+ raise
def listen(self):
"""Listen for TCP connections to the console port..
return 0
def disconnect(self):
+ if self.conn:
+ self.conn.loseConnection()
self.addr = None
self.conn = None
self.listen()
pass
def registerChannel(self):
- print 'CtrlMsgRcvr>registerChannel>', self
+ #print 'CtrlMsgRcvr>registerChannel>', self
self.channel = self.channelFactory.domChannel(self.dom)
self.idx = self.channel.getIndex()
if self.majorTypes:
self.channel.registerDevice(self.majorTypes, self)
def deregisterChannel(self):
- print 'CtrlMsgRcvr>deregisterChannel>', self
+ #print 'CtrlMsgRcvr>deregisterChannel>', self
if self.channel:
self.channel.deregisterDevice(self)
del self.channel
return None
def delInstance(self, instance):
+ #print 'ControllerFactory>delInstance>', instance.idx
if instance.idx in self.instances:
+ #print 'ControllerFactory>delInstance> remove', instance.idx
del self.instances[instance.idx]
def createInstance(self, dom, recreate=0):
raise NotImplementedError()
def instanceClosed(self, instance):
+ #print 'ControllerFactory>instanceClosed>', instance.idx, instance
self.delInstance(instance)
def addDeferred(self):
def __init__(self, factory, dom):
CtrlMsgRcvr.__init__(self)
self.factory = factory
- self.dom = dom
+ self.dom = int(dom)
self.channel = None
self.idx = None
self.lostChannel()
def lostChannel(self):
+ #print 'Controller>lostChannel>', self, self.factory
self.factory.instanceClosed(self)
class Dev: